package com.offcn.bigdata.spark.sql.p2

import org.apache.spark.sql.SparkSession

/**
  * SparkSQL的函数操作
  * 函数，和java、scala中的函数或者方法的概念一样，就是对某一特定功能的封装。
  *  函数通常进行如下的分类：
  *     数学函数
  *         abs
  *         ceil
  *         floor
  *         rand
  *         round
  *         number_format
  *         format
  *     条件函数
  *         if
  *         case when
  *     统计聚合函数
  *
  *     开窗函数
  *         row_number() over
  *         sum() over()
  *     处理特殊类型的函数：
  *         String  要掌握
  *         Date    要掌握
  *         cast  类型转换
  *  从功能上分类：
  *     UDF     ：一路输入，一路输出，比如len, toUpper, year
  *     UDAF    ：多路输入，一路输出，比如sum，count
  *     UDTF    ：一路输入，多路输出，比如explode
  *
  *  案例：sql版本的wordcount        记忆
  *     select
  *         tmp.word,
  *         count(1) count
  *     from (
  *         select
  *             explode(split(line, '\\s+')) word
  *         from wc
  *     ) tmp
  *     group by tmp.word;
  *  开窗函数：分组topN---row_number() 记忆
  *  hive> select * from score;
        *OK
        *score.sid	score.cid	score.score
        *01	01	80
        *01	02	90
        *01	03	99
        *02	01	70
        *02	02	60
        *02	03	80
        *03	01	80
        *03	02	80
        *03	03	80
        *04	01	50
        *04	02	30
        *04	03	20
        *05	01	76
        *05	02	87
        *06	01	31
        *06	03	34
        *07	02	89
        *07	03	98
        *Time taken: 0.147 seconds, Fetched: 18 row(s)
      *hive> select * from course;
        *OK
        *course.id	course.name	course.tid
        *01	语文	02
        *02	数学	01
        *03	英语	03
        *Time taken: 0.255 seconds, Fetched: 3 row(s)
      *hive> select * from student;
        *OK
        *student.id	student.name	student.birthday	student.sex
        *01	赵雷	1990-01-01	男
        *02	钱电	1990-12-21	男
        *03	孙风	1990-05-20	男
        *04	李云	1990-08-06	男
        *05	周梅	1991-12-01	女
        *06	吴兰	1992-03-01	女
        *07	郑竹	1989-07-01	女
        *08	王菊	1990-01-20	女
    *需求：求每一门课程成绩排名前3位的学生和成绩
        *select
            *
        *from (
            *select
                *,
                *row_number() over(partition by cid order by score desc) rank
            *from score
        *) tmp
        *where tmp.rank < 4;
 **
 需求：随机返回表中的N条记录
        *select * from student order by rand() limit 3;
        *select *, rand() random from student order by random limit 3;
  */
object _01SparkSQLWithFunctioOps {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
                            .appName("_01SparkSQLWithFunctioOps")
                            .master("local[*]")
                            .getOrCreate()
        import spark.implicits._
//        2. 需要注册自定义的内容
//        spark.udf.register("myLen", (word: String) => myLength(word))
        spark.udf.register[Int, String]("myLen", myLength)
        /*
            自定义一个udf 模拟string的length函数
            步骤：
                1. 写一个类去扩展一个trait或者类，复写其中的方法，或者写一个方法完成业务
                2. 需要注册自定义的内容
                3. 直接使用即可
         */
        val df = spark.createDataset(List(
            "hello", "want", "let", "dinner", "lunch"
        )).toDF("word")

        df.createOrReplaceTempView("tmp")

        //3. 直接使用即可
        spark.sql(
            """
              |select
              |  word,
              |  length(word) len,
              |  myLen(word) my_len
              |from tmp
            """.stripMargin)
            .show()

        spark.stop()
    }
    //1. 写一个类去扩展一个trait或者类，复写其中的方法，或者写一个方法完成业务
    def myLength(str: String) = if(null == str) 0 else str.length
}
